1
//--------------------------------------------------------------------------
3 // Copyright (c) Microsoft Corporation. All rights reserved.
5 // File: ChunkPartitioner.cs
7 //--------------------------------------------------------------------------
9 using System
.Collections
.Generic
;
10 using System
.Threading
;
12 namespace System
.Collections
.Concurrent
.Partitioners
15 /// Partitions an enumerable into chunks based on user-supplied criteria.
17 public static class ChunkPartitioner
19 /// <summary>Creates a partitioner that chooses the next chunk size based on a user-supplied function.</summary>
20 /// <typeparam name="TSource">The type of the data being partitioned.</typeparam>
21 /// <param name="source">The data being partitioned.</param>
22 /// <param name="nextChunkSizeFunc">A function that determines the next chunk size based on the
23 /// previous chunk size.</param>
24 /// <returns>A partitioner.</returns>
25 public static OrderablePartitioner
<TSource
> Create
<TSource
>(
26 IEnumerable
<TSource
> source
, Func
<int, int> nextChunkSizeFunc
)
28 return new ChunkPartitioner
<TSource
>(source
, nextChunkSizeFunc
);
31 /// <summary>Creates a partitioner that always uses a user-specified chunk size.</summary>
32 /// <typeparam name="TSource">The type of the data being partitioned.</typeparam>
33 /// <param name="source">The data being partitioned.</param>
34 /// <param name="chunkSize">The chunk size to be used.</param>
35 /// <returns>A partitioner.</returns>
36 public static OrderablePartitioner
<TSource
> Create
<TSource
>(
37 IEnumerable
<TSource
> source
, int chunkSize
)
39 return new ChunkPartitioner
<TSource
>(source
, chunkSize
);
42 /// <summary>Creates a partitioner that chooses chunk sizes between the user-specified min and max.</summary>
43 /// <typeparam name="TSource">The type of the data being partitioned.</typeparam>
44 /// <param name="source">The data being partitioned.</param>
45 /// <param name="minChunkSize">The minimum chunk size to use.</param>
46 /// <param name="maxChunkSize">The maximum chunk size to use.</param>
47 /// <returns>A partitioner.</returns>
48 public static OrderablePartitioner
<TSource
> Create
<TSource
>(
49 IEnumerable
<TSource
> source
, int minChunkSize
, int maxChunkSize
)
51 return new ChunkPartitioner
<TSource
>(source
, minChunkSize
, maxChunkSize
);
56 /// Partitions an enumerable into chunks based on user-supplied criteria.
58 internal sealed class ChunkPartitioner
<T
> : OrderablePartitioner
<T
>
60 private readonly IEnumerable
<T
> _source
;
61 private readonly Func
<int, int> _nextChunkSizeFunc
;
63 public ChunkPartitioner(IEnumerable
<T
> source
, Func
<int, int> nextChunkSizeFunc
)
64 // The keys will be ordered across both individual partitions and across partitions,
65 // and they will be normalized.
66 : base(true, true, true)
68 // Validate and store the enumerable and function (used to determine how big
69 // to make the next chunk given the current chunk size)
70 if (source
== null) throw new ArgumentNullException("source");
71 if (nextChunkSizeFunc
== null) throw new ArgumentNullException("nextChunkSizeFunc");
73 _nextChunkSizeFunc
= nextChunkSizeFunc
;
76 public ChunkPartitioner(IEnumerable
<T
> source
, int chunkSize
)
77 : this(source
, prev
=> chunkSize
) // uses a function that always returns the specified chunk size
79 if (chunkSize
<= 0) throw new ArgumentOutOfRangeException("chunkSize");
82 public ChunkPartitioner(IEnumerable
<T
> source
, int minChunkSize
, int maxChunkSize
) :
83 this(source
, CreateFuncFromMinAndMax(minChunkSize
, maxChunkSize
)) // uses a function that grows from min to max
85 if (minChunkSize
<= 0 ||
86 minChunkSize
> maxChunkSize
) throw new ArgumentOutOfRangeException("minChunkSize");
89 private static Func
<int, int> CreateFuncFromMinAndMax(int minChunkSize
, int maxChunkSize
)
91 // Create a function that returns exponentially growing chunk sizes between minChunkSize and maxChunkSize
92 return delegate(int prev
)
94 if (prev
< minChunkSize
) return minChunkSize
;
95 if (prev
>= maxChunkSize
) return maxChunkSize
;
97 if (next
>= maxChunkSize
|| next
< 0) return maxChunkSize
;
103 /// Partitions the underlying collection into the specified number of orderable partitions.
105 /// <param name="partitionCount">The number of partitions to create.</param>
106 /// <returns>An object that can create partitions over the underlying data source.</returns>
107 public override IList
<IEnumerator
<KeyValuePair
<long, T
>>> GetOrderablePartitions(int partitionCount
)
109 // Validate parameters
110 if (partitionCount
<= 0) throw new ArgumentOutOfRangeException("partitionCount");
112 // Create an array of dynamic partitions and return them
113 var partitions
= new IEnumerator
<KeyValuePair
<long, T
>>[partitionCount
];
114 var dynamicPartitions
= GetOrderableDynamicPartitions(true);
115 for (int i
= 0; i
< partitionCount
; i
++)
117 partitions
[i
] = dynamicPartitions
.GetEnumerator(); // Create and store the next partition
122 /// <summary>Gets whether additional partitions can be created dynamically.</summary>
123 public override bool SupportsDynamicPartitions { get { return true; }
}
126 /// Creates an object that can partition the underlying collection into a variable number of
130 /// An object that can create partitions over the underlying data source.
132 public override IEnumerable
<KeyValuePair
<long, T
>> GetOrderableDynamicPartitions()
134 return new EnumerableOfEnumerators(this, false);
137 private IEnumerable
<KeyValuePair
<long, T
>> GetOrderableDynamicPartitions(bool referenceCountForDisposal
)
139 return new EnumerableOfEnumerators(this, referenceCountForDisposal
);
142 // The object used to dynamically create partitions
143 private class EnumerableOfEnumerators
: IEnumerable
<KeyValuePair
<long, T
>>, IDisposable
145 private readonly ChunkPartitioner
<T
> _parentPartitioner
;
146 private readonly object _sharedLock
= new object();
147 private readonly IEnumerator
<T
> _sharedEnumerator
;
148 private long _nextSharedIndex
;
149 private int _activeEnumerators
;
150 private bool _noMoreElements
;
151 private bool _disposed
;
152 private bool _referenceCountForDisposal
;
154 public EnumerableOfEnumerators(ChunkPartitioner
<T
> parentPartitioner
, bool referenceCountForDisposal
)
156 // Validate parameters
157 if (parentPartitioner
== null) throw new ArgumentNullException("parentPartitioner");
159 // Store the data, including creating an enumerator from the underlying data source
160 _parentPartitioner
= parentPartitioner
;
161 _sharedEnumerator
= parentPartitioner
._source
.GetEnumerator();
162 _nextSharedIndex
= -1;
163 _referenceCountForDisposal
= referenceCountForDisposal
;
166 IEnumerator IEnumerable
.GetEnumerator() { return GetEnumerator(); }
167 public IEnumerator
<KeyValuePair
<long, T
>> GetEnumerator()
169 if (_referenceCountForDisposal
)
171 Interlocked
.Increment(ref _activeEnumerators
);
173 return new Enumerator(this);
176 private void DisposeEnumerator(Enumerator enumerator
)
178 if (_referenceCountForDisposal
)
180 if (Interlocked
.Decrement(ref _activeEnumerators
) == 0)
182 _sharedEnumerator
.Dispose();
187 private class Enumerator
: IEnumerator
<KeyValuePair
<long, T
>>
189 private EnumerableOfEnumerators _parentEnumerable
;
190 private List
<KeyValuePair
<long, T
>> _currentChunk
= new List
<KeyValuePair
<long, T
>>();
191 private int _currentChunkCurrentIndex
;
192 private int _lastRequestedChunkSize
;
193 private bool _disposed
;
195 public Enumerator(EnumerableOfEnumerators parentEnumerable
)
197 if (parentEnumerable
== null) throw new ArgumentNullException("parentEnumerable");
198 _parentEnumerable
= parentEnumerable
;
201 public bool MoveNext()
203 if (_disposed
) throw new ObjectDisposedException(GetType().Name
);
205 // Move to the next cached element. If we already retrieved a chunk and if there's still
206 // data left in it, just use the next item from it.
207 ++_currentChunkCurrentIndex
;
208 if (_currentChunkCurrentIndex
>= 0 &&
209 _currentChunkCurrentIndex
< _currentChunk
.Count
) return true;
211 // First, figure out how much new data we want. The previous requested chunk size is used
212 // as input to figure out how much data the user now wants. The initial chunk size
213 // supplied is 0 so that the user delegate is made aware that this is the initial request
214 // such that it can select the initial chunk size on first request.
215 int nextChunkSize
= _parentEnumerable
._parentPartitioner
._nextChunkSizeFunc(_lastRequestedChunkSize
);
216 if (nextChunkSize
<= 0) throw new InvalidOperationException(
217 "Invalid chunk size requested: chunk sizes must be positive.");
218 _lastRequestedChunkSize
= nextChunkSize
;
221 _currentChunk
.Clear();
222 _currentChunkCurrentIndex
= 0;
223 if (nextChunkSize
> _currentChunk
.Capacity
) _currentChunk
.Capacity
= nextChunkSize
;
225 // Try to grab the next chunk of data
226 lock (_parentEnumerable
._sharedEnumerator
)
228 // If we've already discovered that no more elements exist (and we've gotten this
229 // far, which means we don't have any elements cached), we're done.
230 if (_parentEnumerable
._noMoreElements
) return false;
233 for (int i
= 0; i
< nextChunkSize
; i
++)
235 // If there are no more elements to be retrieved from the shared enumerator, mark
236 // that so that other partitions don't have to check again. Return whether we
237 // were able to retrieve any data at all.
238 if (!_parentEnumerable
._sharedEnumerator
.MoveNext())
240 _parentEnumerable
._noMoreElements
= true;
241 return _currentChunk
.Count
> 0;
244 ++_parentEnumerable
._nextSharedIndex
;
245 _currentChunk
.Add(new KeyValuePair
<long, T
>(
246 _parentEnumerable
._nextSharedIndex
,
247 _parentEnumerable
._sharedEnumerator
.Current
));
251 // We got at least some data
255 public KeyValuePair
<long, T
> Current
259 if (_currentChunkCurrentIndex
>= _currentChunk
.Count
)
261 throw new InvalidOperationException("There is no current item.");
263 return _currentChunk
[_currentChunkCurrentIndex
];
267 public void Dispose()
271 _parentEnumerable
.DisposeEnumerator(this);
276 object IEnumerator
.Current { get { return Current; }
}
277 public void Reset() { throw new NotSupportedException(); }
280 public void Dispose()
284 if (!_referenceCountForDisposal
) _sharedEnumerator
.Dispose();